{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Yes we need both these imports\n",
    "from pyspark.sql import SparkSession\n",
    "from pyspark.sql.functions import col, to_date\n",
    "from pyspark.sql.types import *\n",
    "from pyspark.sql.types import StructField, StructType\n",
    "from pyspark.sql.catalog import UserDefinedFunction\n",
    "import os"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fs_prefix = \"s3a://kf-book-examples/mailing-lists\" # Create with mc as in ch1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# See https://medium.com/@szinck/setting-up-pyspark-jupyter-and-minio-on-kubeflow-kubernetes-aab98874794f\n",
    "#tag::configurePythonVersion[]\n",
    "os.environ[\"PYSPARK_PYTHON\"] = \"python3.6\"\n",
    "#end::configurePythonVersion[]\n",
    "session = (\n",
    "    SparkSession.builder\n",
    "    .appName(\"fetchMailingListData\")\n",
    "    .config(\"spark.executor.instances\", \"8\")\n",
    "    .config(\"spark.driver.memoryOverhead\", \"0.25\")\n",
    "    .config(\"spark.executor.memory\", \"6g\")\n",
    "    .config(\"spark.dynamicAllocation.enabled\", \"false\")\n",
    "    .config(\"spark.ui.enabled\", \"true\")\n",
    "    .config(\"spark.kubernetes.container.image\",\n",
    "           \"gcr.io/boos-demo-projects-are-rad/kubeflow/spark-worker/spark-py-36:v3.0.0-preview2-23\")\n",
    "    #tag::notebookSession[]\n",
    "    .config(\"spark.driver.bindAddress\", \"0.0.0.0\")\n",
    "    .config(\"spark.kubernetes.namespace\", \"kubeflow-programmerboo\")\n",
    "    .config(\"spark.master\", \"k8s://https://kubernetes.default\")\n",
    "    .config(\"spark.driver.host\", \n",
    "            \"spark-driver.kubeflow-programmerboo.svc.cluster.local\")\n",
    "    .config(\"spark.kubernetes.executor.annotation.sidecar.istio.io/inject\",\n",
    "            \"false\")\n",
    "    .config(\"spark.driver.port\", \"39235\")\n",
    "    .config(\"spark.blockManager.port\", \"39236\")\n",
    "    #end::notebookSession[]\n",
    "    # If using minio - see https://github.com/minio/cookbook/blob/master/docs/apache-spark-with-minio.md\n",
    "    #tag::minio[]\n",
    "    .config(\"spark.hadoop.fs.s3a.endpoint\",\n",
    "            \"minio-service.kubeflow.svc.cluster.local:9000\")\n",
    "    .config(\"fs.s3a.connection.ssl.enabled\", \"false\")\n",
    "    .config(\"fs.s3a.path.style.access\", \"true\")\n",
    "    # You can also add an account using the minio command as described in chapter 1\n",
    "    .config(\"spark.hadoop.fs.s3a.access.key\", \"minio\")\n",
    "    .config(\"spark.hadoop.fs.s3a.secret.key\", \"minio123\")\n",
    "    #end::minio[]\n",
    "    ).getOrCreate()\n",
    "sc = session.sparkContext"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Data fetch pipeline: Download mailing list data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "list_name=\"spark-user\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mailing_list_template=\"http://mail-archives.apache.org/mod_mbox/{list_name}/{date}.mbox\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Generate the possible dates"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "start_year=2019 # Change to 2002 once you've verified\n",
    "end_year=2021\n",
    "dates = [\"{:d}{:02d}\".format(year, month) for year in range(start_year, end_year) for month in range (1,12)]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def download_emails(date):\n",
    "    import subprocess\n",
    "    from mailbox import mbox\n",
    "    import os\n",
    "    mbox_filename = \"{date}.mbox\".format(date=date)\n",
    "    url=mailing_list_template.format(list_name=list_name,date=date)\n",
    "    subprocess.call([\"wget\", url])\n",
    "    # Skip years that don't exist\n",
    "    if not os.path.exists(mbox_filename):\n",
    "        return []\n",
    "    mail = mbox(mbox_filename.format(date=date), create=False)\n",
    "    # LC the keys since the casing is non-consistent\n",
    "    def get_body(message):\n",
    "        content_type = message.get_content_type()\n",
    "        # Multi-part messages\n",
    "        if message.is_multipart():\n",
    "            return \"\".join(map(get_body, message.get_payload()))\n",
    "        elif \"text\" in content_type or \"html\" in content_type:\n",
    "            return message.get_payload()\n",
    "        else:\n",
    "            return \"\"\n",
    "    def message_to_dict(message):\n",
    "        ret = dict((k.lower(), v) for k, v in message.items())\n",
    "        ret[\"multipart\"] = message.is_multipart()\n",
    "        ret[\"body\"] = get_body(message)\n",
    "        return ret\n",
    "    emails = list(map(message_to_dict, mail.itervalues()))\n",
    "    os.remove(mbox_filename)\n",
    "    return emails"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Optional: test that it works locally\n",
    "# download_emails(\"202001\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "emails_rdd = sc.parallelize(dates).flatMap(download_emails).cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "emails_rdd.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mailing_list_posts_mbox_df = emails_rdd.toDF(sampleRatio=1.0)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cached = mailing_list_posts_mbox_df.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mailing_list_posts_mbox_df.select(\"list-id\", \"In-Reply-To\").take(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_mailing_list_data = mailing_list_posts_mbox_df.filter(\n",
    "    mailing_list_posts_mbox_df[\"list-id\"].contains(\"spark\")).repartition(60).cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_mailing_list_data.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_mailing_list_data.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def extract_date_from_email_datefield(datefield):\n",
    "    if datefield is None:\n",
    "        return None\n",
    "    from datetime import datetime\n",
    "    import time\n",
    "    import email.utils\n",
    "    parsed_date = email.utils.parsedate(datefield)\n",
    "    return datetime.fromtimestamp(time.mktime((parsed_date)))\n",
    "\n",
    "\n",
    "extract_date_from_email_datefield_udf = UserDefinedFunction(\n",
    "    extract_date_from_email_datefield, StringType(), \"extract_date_from_email_datefield\")\n",
    "\n",
    "session.catalog._jsparkSession.udf().registerPython(\n",
    "    \"extract_date_from_email_datefield\",\n",
    "    extract_date_from_email_datefield_udf._judf)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_mailing_list_data_with_date = spark_mailing_list_data.select(\n",
    "    \"*\",\n",
    "    extract_date_from_email_datefield_udf(spark_mailing_list_data[\"Date\"]).alias(\"email_date\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Manually verify that our date parser is looking ok\n",
    "spark_mailing_list_data.select(spark_mailing_list_data[\"Date\"],\n",
    "                               extract_date_from_email_datefield_udf(spark_mailing_list_data[\"Date\"]).alias(\"email_date\")\n",
    "                              ).take(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::filter_junk[]\n",
    "def is_ok(post):\n",
    "    # Your special business logic goes here\n",
    "    return True\n",
    "spark_mailing_list_data_cleaned = spark_mailing_list_data_with_date.filter(is_ok)\n",
    "#end::filter_junk[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mailing_list_posts_in_reply_to = spark_mailing_list_data_cleaned.filter(\n",
    "    spark_mailing_list_data[\"In-Reply-To\"].isNotNull()).alias(\"mailing_list_posts_in_reply_to\")\n",
    "initial_posts = spark_mailing_list_data_cleaned.filter(\n",
    "    spark_mailing_list_data[\"In-Reply-To\"].isNull()).alias(\"initial_posts\").cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# See how many start-of-thread posts we have\n",
    "initial_posts.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ids_in_reply = mailing_list_posts_in_reply_to.select(\"In-Reply-To\", \"message-id\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ids_in_reply.schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Ok now it's time to save these\n",
    "#tag::write_big_data[]\n",
    "initial_posts.write.format(\"parquet\").mode('overwrite').save(fs_prefix + \"/initial_posts\")\n",
    "ids_in_reply.write.format(\"parquet\").mode('overwrite').save(fs_prefix + \"/ids_in_reply\")\n",
    "#end::write_big_data[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#tag::small_data[]\n",
    "initial_posts.toPandas()\n",
    "#end::small_data[]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "session.stop()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
